音视频开发 RTSP RTSP 服务器 源码理解 Edmend Zhang 2024-07-18 2024-10-31 RTSP 服务器 源码理解 Fd 文件描述符 fd (File Descriptor) 打开的文件或网络连接的一个抽象标识符。每个文件描述符通常是一个整数,在程序运行时用于引用特定的文件或套接字。
管理网络连接 :文件描述符用于表示客户端与服务器之间的网络连接。RTSP服务器会为每个连接分配一个文件描述符,以便能够区分和管理多个客户端连接。
非阻塞I/O :通过将文件描述符设置为非阻塞模式,服务器可以在无需等待数据准备好的情况下进行其他操作。这对于高效处理多个并发连接是非常重要的。
回调函数 :通过为文件描述符绑定回调函数(如 readCallback
),服务器可以在特定事件(如有数据可读)发生时自动调用这些函数,从而处理相应的I/O操作。
select
网络模型 :select
是一种多路复用技术,用于监视多个文件描述符,以查看哪些文件描述符处于可读、可写或有异常的状态。这使得服务器可以在一个线程中高效地处理多个网络连接。
如何理解 Live555 的Source & Sink
Source 发送端, 流的起点, 可直观理解为生产者, 负责读取文件或网络流的信息. Sink 接收端, 流的终点, 可理解为是消费者。
Source: 可能是RTP读取数据, 从文件中或摄像头设备中等.
Sink: 数据流最终可保存在文件中, 或显示在屏幕上等.
MediaSession: 用于表示一个RTP会话, 一个MediaSession可能包含多个子会话(MediaSubSession),子会话可以是音频子会话、视频子会话等。
(https://www.jianshu.com/p/0bdf07f7a5d5)
Source 和 sink 都有一个源头类 MediaSouce是所有Souce的基类
MediaSink是所有Sink的基类
以H264进行举例 H264VideoStreamFramer是真正的Souce,它用于从H264文件中读取数据,并组装成帧。
H264VideoFileSink是真正的Sink, 完成将数据保存至文件.
H264VideoRTPSink是真正的Sink, 完成数据的发送
1 2 3 4 5 对于H264码流,数据流的流动方向为: 服务器端: H264VideoStreamFramer ->H264Or5Fragmenter (Filter)r->H264VideoRTPSink 客户端: H264RTPSouce -> Sink
setReuseAddr setReuseAddr
函数在网络编程中用于设置套接字选项,使得端口可以在套接字关闭后立即重新使用。这对于服务器应用程序特别有用,因为它允许服务器在崩溃或重启后快速重新绑定到相同的端口,而不必等待系统默认的时间间隔(通常是几分钟)才能重新使用该端口。
具体来说,setReuseAddr
函数通过设置 SO_REUSEADDR
套接字选项来实现这一功能。
系统结构
通过SessionManger 来存储实入流 每一个流都定义为一个session
定时器发送控制信号到SessionManager以维持帧率。
SessionManager 管理会话 test
和其包含的流。
RTSP服务器接收并处理客户端连接请求。
RTSP连接处理具体的流,推送 h264 和 aac 流到客户端。
客户端通过 RTSP 客户端软件接收并播放流媒体内容。
udp传输不需要加入fd
main function
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int main () { srand (time (NULL )); EventScheduler* scheduler = EventScheduler::createNew (EventScheduler::POLLER_SELECT); ThreadPool* threadPool = ThreadPool::createNew (1 ); MediaSessionManager* sessMgr = MediaSessionManager::createNew (); UsageEnvironment* env = UsageEnvironment::createNew (scheduler, threadPool); Ipv4Address rtspAddr ("127.0.0.1" , 8554 ) ; RtspServer* rtspServer = RtspServer::createNew (env, sessMgr,rtspAddr); LOGI ("----------session init start------" ); { MediaSession* session = MediaSession::createNew ("test" ); MediaSource* source = H264FileMediaSource::createNew (env, "../data/daliu.h264" ); Sink* sink = H264FileSink::createNew (env, source); session->addSink (MediaSession::TrackId0, sink); source = AACFileMeidaSource::createNew (env, "../data/daliu.aac" ); sink = AACFileSink::createNew (env, source); session->addSink (MediaSession::TrackId1, sink); sessMgr->addSession (session); } LOGI ("----------session init end------" ); rtspServer->start (); env->scheduler ()->loop (); return 0 ; }
创建RTSP Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 RtspServer::RtspServer (UsageEnvironment* env, MediaSessionManager* sessMgr, Ipv4Address& addr) : mSessMgr (sessMgr), mEnv (env), mAddr (addr), mListen (false ) { mFd = sockets::createTcpSock (); sockets::setReuseAddr (mFd, 1 ); if (!sockets::bind (mFd, addr.getIp (), mAddr.getPort ())) { return ; } LOGI ("rtsp://%s:%d fd=%d" ,addr.getIp ().data (),addr.getPort (), mFd); mAcceptIOEvent = IOEvent::createNew (mFd, this ); mAcceptIOEvent->setReadCallback (readCallback); mAcceptIOEvent->enableReadHandling (); mCloseTriggerEvent = TriggerEvent::createNew (this ); mCloseTriggerEvent->setTriggerCallback (cbCloseConnect); }
1 2 3 4 5 6 7 8 9 10 11 12 void RtspServer::handleRead () { int clientFd = sockets::accept (mFd); if (clientFd < 0 ) { LOGE ("handleRead error,clientFd=%d" ,clientFd); return ; } RtspConnection* conn = RtspConnection::createNew (this , clientFd); conn->setDisConnectCallback (RtspServer::cbDisConnect, this ); mConnMap.insert (std::make_pair (clientFd, conn)); }
回调函数调用
1 2 3 4 5 6 7 8 9 10 11 12 void RtspServer::handleRead () { int clientFd = sockets::accept (mFd); if (clientFd < 0 ) { LOGE ("handleRead error,clientFd=%d" ,clientFd); return ; } RtspConnection* conn = RtspConnection::createNew (this , clientFd); conn->setDisConnectCallback (RtspServer::cbDisConnect, this ); mConnMap.insert (std::make_pair (clientFd, conn)); }
在创建RTSP Sever实例后,只要RTSP Server的描述符出现了可读事件(readCallback)>> 执行 HandelRead(),然后创建ClientFd,监收请求的客户端描述符。当ClientFd不小于0,创建一个连接,当结束时 触发回调函数,结束连接。
1 2 3 4 5 6 7 8 9 void RtspServer::handleDisConnect (int clientFd) { std::lock_guard <std::mutex> lck (mMtx); mDisConnList.push_back (clientFd); mEnv->scheduler ()->addTriggerEvent (mCloseTriggerEvent); }
在RTSP Server里,包含triggerEvent实例 当事件加入到调度中,被处理到的时候激活函数
handleDisconnect中只是加入了队列,并没有人去执行,Trigger Event 被激活之后 回调到cbCloseConnect,然后被handleCloseConnect处理
handleCloseConnect遍历队列 清理连接
1 2 3 4 5 6 7 8 9 10 11 MediaSession* session = MediaSession::createNew ("test" ); MediaSource* source = H264FileMediaSource::createNew (env, "../data/daliu.h264" ); Sink* sink = H264FileSink::createNew (env, source); session->addSink (MediaSession::TrackId0, sink); source = AACFileMeidaSource::createNew (env, "../data/daliu.aac" ); sink = AACFileSink::createNew (env, source); session->addSink (MediaSession::TrackId1, sink); sessMgr->addSession (session);
通过source 传给sink资源
两路流 trackId0和trackId1都加入session里
sessMgr -> addSession(session)
RtspServer start
1 2 3 4 5 6 void RtspServer::start(){ LOGI(""); mListen = true; // 设置mListen sockets::listen(mFd, 60); mEnv->scheduler()->addIOEvent(mAcceptIOEvent); // 创建IO事件 加入队列循环 }
Scheduler loop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #ifdef WIN32 std::thread ([](EventScheduler* sch) { while (!sch->mQuit) { if (sch->mTimerManagerReadCallback) { sch->mTimerManagerReadCallback (sch->mTimerManagerArg); } } }, this ).detach (); #endif while (!mQuit) { handleTriggerEvents (); mPoller->handleEvent (); } } void EventScheduler::handleTriggerEvents () { if (!mTriggerEvents.empty ()) { for (std::vector<TriggerEvent*>::iterator it = mTriggerEvents.begin (); it != mTriggerEvents.end (); ++it) { (*it)->handleEvent (); } mTriggerEvents.clear (); } }
Poller 网络模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 switch (type) { case POLLER_SELECT: mPoller = SelectPoller::createNew (); break ; default : _exit(-1 ); break ; } mTimerManager = TimerManager::createNew (this );
这里需要一个定时器管理器 面对多路流传输
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 mTimerFd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (mTimerFd < 0 ) { LOGE ("create TimerFd error" ); return ; }else { LOGI ("fd=%d" ,mTimerFd); } mTimerIOEvent = IOEvent::createNew (mTimerFd, this ); mTimerIOEvent->setReadCallback (readCallback); mTimerIOEvent->enableReadHandling (); modifyTimeout (); mPoller->addIOEvent (mTimerIOEvent); #else scheduler->setTimerManagerReadCallback (readCallback, this );
setTimerManagerReadCallback
1 2 3 4 5 6 void EventScheduler::setTimerManagerReadCallback (EventCallback cb, void * arg) { mTimerManagerReadCallback = cb; mTimerManagerArg = arg; }
如果设置fps为25
定时器每一秒回调25次 40ms/t
定时器事件不为空,则去除事件计算差值,在sink里创建事件事件
1 2 3 LOGI ("Sink()" );mTimerEvent = TimerEvent::createNew (this ); mTimerEvent->setTimeoutCallback (cbTimeout);
handelTimeout
1 2 3 4 5 6 7 8 9 10 11 12 13 void Sink::cbTimeout (void *arg) { Sink* sink = (Sink*)arg; sink->handleTimeout (); } void Sink::handleTimeout () { MediaFrame* frame = mMediaSource->getFrameFromOutputQueue (); if (!frame) { return ; } this ->sendFrame (frame); mMediaSource->putFrameToInputQueue (frame); }
Task
加入线程池 每个子线程都读取一个队列 没有则阻塞等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void ThreadPool::addTask (ThreadPool::Task& task) { std::unique_lock <std::mutex> lck (mMtx); mTaskQueue.push (task); mCon.notify_one (); } void ThreadPool::loop () { while (!mQuit){ std::unique_lock <std::mutex> lck (mMtx); if (mTaskQueue.empty ()) { mCon.wait (lck); } if (mTaskQueue.empty ()) continue ; Task task = mTaskQueue.front (); mTaskQueue.pop (); task.handle (); } }
handle最终调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void H264FileMediaSource::handleTask () { std::lock_guard <std::mutex> lck (mMtx); if (mFrameInputQueue.empty ()) return ; MediaFrame* frame = mFrameInputQueue.front (); int startCodeNum = 0 ; while (true ) { frame->mSize = getFrameFromH264File (frame->temp, FRAME_MAX_SIZE); if (frame->mSize < 0 ) { return ; } if (startCode3 (frame->temp)){ startCodeNum = 3 ; }else { startCodeNum = 4 ; } frame->mBuf = frame->temp + startCodeNum; frame->mSize -= startCodeNum; uint8_t naluType = frame->mBuf[0 ] & 0x1F ; if (0x09 == naluType) { continue ; } else if (0x07 == naluType || 0x08 == naluType) { break ; } else { break ; } } mFrameInputQueue.pop (); mFrameOutputQueue.push (frame); }